#include "remm_thread.h"


void* remm_process_thread_work(void* arg) {
	lsv_s32_t i = 0;
	lsv_s32_t err = 0;
	lsv_thread_t *thread = NULL;
        lsv_volume_proto_t *volume_proto = NULL;
	lsv_remm_info *remm_info = NULL;
        
	thread = (lsv_thread_t *) arg;
	volume_proto = (lsv_volume_proto_t *) thread->work_info;
	remm_info = (lsv_remm_info *)volume_proto->remm_info;
	
        thread->is_running = 1;
        sem_post(&thread->startsem);
      	DINFO("create remm thread: %u, id: %d\n", thread->seqno, remm_info->lsv_remm_id);

        while (1) {
		thread->is_working = 1;
		if(LSV_REMM_SLAVE == ((lsv_remm_info *)volume_proto->remm_info)->lsv_remm_id){
			remm_receive_process_loop(volume_proto);
		}else if(LSV_REMM_MASTER == ((lsv_remm_info *)volume_proto->remm_info)->lsv_remm_id){
			remm_batch_send_by_remm(volume_proto);
		}

		thread->is_working = 0;
		if (thread->to_shutdown) {
			goto EXIT;
		}

        }
EXIT: 
	thread->is_working = 0;
	thread->is_running = 0;
	sem_post(&thread->stopsem);
	DINFO("remm thread leave:%u\n", thread->seqno);
	return NULL;
}

lsv_s32_t remm_sync_record_work(lsv_volume_proto_t * volume_proto){
	lsv_remm_info *remm_info = NULL;
	struct list_head * index = NULL;
	lsv_remm_block *remm_block = NULL;
	lsv_s32_t err = 0;
	lsv_s32_t i = 0;

	remm_info = (lsv_remm_info *)volume_proto->remm_info;
	
	lsv_wrlock(&list_rwlock);
	
	if(!list_empty(&remm_info->remm_list)){
		list_for_each(index, &remm_info->remm_list){
			remm_block = (lsv_remm_block *)index;
			for(i = 0; i < LSV_CHUNK_SIZE / LSV_PAGE_SIZE; i++){
				if(0 == (remm_block->dirty_bitmap[i/8] & (1 << (8 - 1 - i % 8)))){
					continue;
				}
				err = lsv_volume_chunk_update(volume_proto, 
				remm_block->chunk_id,
				remm_block->chunk_buf + i * LSV_PAGE_SIZE,
				LSV_PAGE_SIZE * i,
				LSV_PAGE_SIZE);
				if(err){
					DERROR("remm record info sync into disk failed, err:%d\n", err);
					err = -1;
					goto EXIT;					
				}
				remm_block->dirty_bitmap[i/8] |= ~(1 << (8 - 1 - i % 8));	
			}
		}
	}


EXIT:
	lsv_rwunlock(&list_rwlock);
		
	return err;

}

void* remm_sync_thread_work(void* arg) {
	lsv_s32_t i = 0;
	lsv_s32_t err = 0;
	lsv_thread_t *thread = NULL;
        lsv_volume_proto_t *volume_proto = NULL;
	lsv_remm_info *remm_info = NULL;
        
	thread = (lsv_thread_t *) arg;
	volume_proto = (lsv_volume_proto_t *) thread->work_info;
	remm_info = (lsv_remm_info *)volume_proto->remm_info;
	
        thread->is_running = 1;
        sem_post(&thread->startsem);
      	DINFO("create remm thread: %u, id: %d\n", thread->seqno, remm_info->lsv_remm_id);

        while (1) {
		thread->is_working = 1;
		remm_sync_record_work(volume_proto);
		thread->is_working = 0;
		if (thread->to_shutdown) {
			goto EXIT;
		}
		usleep(5000);
        }
EXIT: 
	thread->is_working = 0;
	thread->is_running = 0;
	sem_post(&thread->stopsem);
	DINFO("remm thread leave:%u\n", thread->seqno);
	return NULL;
}


static inline int remm_thread_free(lsv_thread_t* thread) {
        if (thread) {
                sem_destroy(&thread->startsem);
                sem_destroy(&thread->stopsem);
                free(thread);
        }
        return 0;
}

lsv_s32_t remm_thread_init(lsv_volume_proto_t *volume_proto) {
        lsv_s32_t rc = 0;
        lsv_thread_t *thread = NULL;
	lsv_remm_info * remm_info = NULL;

	remm_info = (lsv_remm_info *)volume_proto->remm_info;
        thread = (lsv_thread_t*) malloc(sizeof(lsv_thread_t));
        if (NULL == thread) {
                rc = -ENOMEM;
                DERROR("malloc thread failed,errno:%d\n", rc);
                goto ERR;
        }
        thread->seqno = volume_proto->ino;
        thread->is_running = 0;
        thread->is_working = 0;
        thread->to_shutdown = 0;
        thread->work_info = volume_proto;
        sem_init(&thread->startsem, 0, 0);
        sem_init(&thread->stopsem, 0, 0);
        rc = pthread_create(&thread->thread_id, NULL, remm_process_thread_work, thread);
        if (rc) {
                DERROR("start thread error,errno:%d\n", rc);;
                goto ERR;
        }
        sem_wait(&thread->startsem);

        remm_info->thread = *thread;
        return 0;
        ERR: remm_thread_free(thread);
        return rc;
}

lsv_s32_t remm_thread_destroy(lsv_volume_proto_t * volume_proto) {
        lsv_thread_t * thread = &((lsv_remm_info *)volume_proto->remm_info)->thread;

        if (thread) {
                thread->to_shutdown = 1;
                sem_wait(&thread->stopsem);
                remm_thread_free(thread);
        }

        return 0;
}

lsv_s32_t remm_sync_thread_init(lsv_volume_proto_t *volume_proto) {
        lsv_s32_t rc = 0;
        lsv_thread_t *thread = NULL;
	lsv_remm_info * remm_info = NULL;

	remm_info = (lsv_remm_info *)volume_proto->remm_info;
        thread = (lsv_thread_t*) malloc(sizeof(lsv_thread_t));
        if (NULL == thread) {
                rc = -ENOMEM;
                DERROR("malloc thread failed,errno:%d\n", rc);
                goto ERR;
        }
        thread->seqno = volume_proto->ino;
        thread->is_running = 0;
        thread->is_working = 0;
        thread->to_shutdown = 0;
        thread->work_info = volume_proto;
        sem_init(&thread->startsem, 0, 0);
        sem_init(&thread->stopsem, 0, 0);
        rc = pthread_create(&thread->thread_id, NULL, remm_sync_thread_work, thread);
        if (rc) {
                DERROR("start thread error,errno:%d\n", rc);;
                goto ERR;
        }
        sem_wait(&thread->startsem);

        remm_info->sync_thread = *thread;
        return 0;
        ERR: remm_thread_free(thread);
        return rc;
}

lsv_s32_t remm_sync_thread_destroy(lsv_volume_proto_t * volume_proto) {
        lsv_thread_t * thread = &((lsv_remm_info *)volume_proto->remm_info)->sync_thread;

        if (thread) {
                thread->to_shutdown = 1;
                sem_wait(&thread->stopsem);
                remm_thread_free(thread);
        }

        return 0;
}
